Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Maintain replication connection between sync flows #1211

Merged
merged 3 commits into from
Feb 22, 2024
Merged

Conversation

serprex
Copy link
Contributor

@serprex serprex commented Feb 6, 2024

Currently we reconnect with each sync flow, requiring repeatedly starting replication. This can take an exceedingly long time for some workloads on some databases

Fix: use temporal session to share state between activities, use a single source connector throughout cdc flow, & move replication connection back into source connection

flow/activities/flowable.go Outdated Show resolved Hide resolved
@serprex serprex force-pushed the spiritus-mundi branch 3 times, most recently from c936149 to 772def2 Compare February 7, 2024 04:40
serprex added a commit that referenced this pull request Feb 7, 2024
…anceled (#1214)

Thinking in #1211 the workflow is exiting with an error
which we're then ignoring & letting test pass but cleanup fail
@serprex serprex force-pushed the spiritus-mundi branch 3 times, most recently from c00b723 to e14b538 Compare February 7, 2024 17:01
@serprex serprex marked this pull request as ready for review February 7, 2024 18:54
@serprex serprex requested a review from iskakaushik February 7, 2024 18:54
@serprex serprex changed the title Use a single source connector per cdc flow to avoid repeatedly reconnecting START REPLICATION Maintain replication connection between sync flows Feb 7, 2024
@serprex
Copy link
Contributor Author

serprex commented Feb 7, 2024

Kevin pointed out a problem: in non parallel sync-normalize long normalize will have connection silent for too long & postgres can drop connection. Need to have keepalive logic. MaintainPull could work, but then we need to make sure to use synchronization between MaintainPull & StartFlow

@serprex serprex force-pushed the spiritus-mundi branch 3 times, most recently from a395c50 to 7cdb75b Compare February 8, 2024 16:05
@@ -45,9 +45,10 @@ type SlotSnapshotSignal struct {
type FlowableActivity struct {
CatalogPool *pgxpool.Pool
Alerter *alerting.Alerter
CdcCacheRw sync.RWMutex
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe have a replication connection manager struct that takes care of:

  1. This map and locking.
  2. Keeping track of the connection health and lifecycle.
  3. Any other additional metadata pertaining to the replication connection.

Copy link
Contributor Author

@serprex serprex Feb 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tracking connection health & lifecycle are part of MaintainPull which needs to exist either way to keep heartbeating session. Additional metadata belongs in the connector, where we avoid contention on CdcCacheRw

I could see moving replState/replConn out of the connector, then storing a struct { replState, replConn, connector } as the value of the hashmap. But for now putting it all in connector works about the same

@serprex serprex mentioned this pull request Feb 9, 2024
iskakaushik pushed a commit that referenced this pull request Feb 10, 2024
Required updating connector interfaces. Bit annoying `ctx` everywhere,
but that's ultimately the correct way. Was running into context
complications in #1211 with connector being shared between activities

Putting context in struct essentially makes that struct a context, but
this is not the context we necessarily want. For more context, see
https://zenhorace.dev/blog/context-control-go

Some changes were made:
1. GetCatalog takes a context now instead of using
`context.Background()`
2. eventhubs processBatch now takes context instead of using
`context.Background()`
3. many instances of `Query`/`Exec` in snowflake/clickhouse converted to
`QueryContext`/`ExecContext`
4. got rid of cancel context in ssh tunnel, context being passed in is
sufficient

Followup to #1238
@serprex serprex force-pushed the spiritus-mundi branch 6 times, most recently from 7558024 to ff908e5 Compare February 15, 2024 19:19
@serprex serprex force-pushed the spiritus-mundi branch 3 times, most recently from 4ccc9dd to 7583072 Compare February 16, 2024 13:33
@iskakaushik
Copy link
Contributor

does using OriginalRunID over uuid have any unexpected consequences, if we were to reset the state of the workflow, would that re-create the new child workflow or would it complain that the run-id matches?

@serprex
Copy link
Contributor Author

serprex commented Feb 16, 2024

Recreation will have new RunID. OriginalRunID is because RunID changes with replays, OriginalRunID maintains determinism

https://pkg.go.dev/go.temporal.io/[email protected]/internal#WorkflowInfo

FirstRunID is what gives the RunID of the workflow consistent across ContinueAsNew

@serprex serprex force-pushed the spiritus-mundi branch 2 times, most recently from 92a9064 to 9010c35 Compare February 20, 2024 18:34
serprex added a commit that referenced this pull request Feb 20, 2024
Less side effects, less error handling, can correlate different workflows with same run id

Also pull in some other cleanup from #1211
serprex added a commit that referenced this pull request Feb 20, 2024
Less side effects, less error handling, can correlate different workflows with same run id

Also pull in some other cleanup from #1211
serprex added a commit that referenced this pull request Feb 20, 2024
Less side effects,
less error handling,
can correlate different workflows with same run id

Also pull in some other cleanup from #1211
serprex added a commit that referenced this pull request Feb 21, 2024
Only pass config & options to StartFlow, removing StartFlowInput

In fact, while we're at it, rename StartFlow to SyncFlow,
the name doesn't really make sense anymore,
& it'll make less sense after #1211
serprex added a commit that referenced this pull request Feb 21, 2024
Only pass config & options to StartFlow, removing StartFlowInput

In fact, while we're at it, rename StartFlow to SyncFlow,
the name doesn't really make sense anymore,
& it'll make less sense after #1211
serprex added a commit that referenced this pull request Feb 21, 2024
Only pass config & options to StartFlow, removing StartFlowInput

In fact, while we're at it, rename StartFlow to SyncFlow,
the name doesn't really make sense anymore,
& it'll make less sense after #1211
@iskakaushik iskakaushik merged commit 8f4ad4e into main Feb 22, 2024
7 checks passed
@iskakaushik iskakaushik deleted the spiritus-mundi branch February 22, 2024 13:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants